
package com.pl.process;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import com.pl.configuration.KafkaTopicProperties;
import com.pl.configuration.MQKafkaProperties;
import com.pl.event.AbstractMQEvent;
import com.pl.event.AnonymousBusEvent;
import com.pl.event.BusEvent;
import com.pl.jackson.JavaTimeModule;
import com.pl.process.KafkaMessageProcessor;
import com.pl.process.MQServiceMatcher;
import com.pl.process.ProducerTopicSelector;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.KafkaFuture.BiConsumer;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.env.Environment;
import org.springframework.core.type.filter.AssignableTypeFilter;
import org.springframework.core.type.filter.TypeFilter;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.util.ClassUtils;

public class MQProcessor implements BeanFactoryAware {
    private static final Logger log = LoggerFactory.getLogger(MQProcessor.class);
    @Autowired
    private Environment env;
    @Autowired
    private MQKafkaProperties mqProperties;
    private Map<Class<? extends AbstractMQEvent>, KafkaMessageProcessor> messageProcessorMap = new ConcurrentHashMap();
    @Autowired
    @Qualifier("integrationEvaluationContext")
    @Lazy
    private EvaluationContext evaluationContext;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Autowired
    private AdminClient adminClient;
    @Autowired
    private ConsumerFactory<String, String> consumerFactory;
    @Autowired
    private ApplicationContext context;
    @Autowired
    private MQServiceMatcher serviceMatcher;
    private ObjectMapper mapper = this.createObjectMapper();
    @Value("${spring.application.name}")
    private String applicationName;
    private volatile boolean isStarted = false;
    public static final String MQ_ID = UUID.fastUUID().toString(true);
    private static final AtomicLong eventId = new AtomicLong(0L);
    @Autowired
    private Converter<String, Expression> spelExpressionConverter;
    private static final String DEFAULT_PACKAGE = ClassUtils.getPackageName(BusEvent.class);
    private static List<String> packagesToScan;
    private BeanFactory beanFactory;

    private ObjectMapper createObjectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.setDefaultPropertyInclusion(Include.NON_NULL);
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        mapper.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
        mapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
        JavaTimeModule javaTimeModule = new JavaTimeModule();
        mapper.registerModule(javaTimeModule);
        return mapper;
    }

    public MQProcessor() {
    }

    @EventListener({AbstractMQEvent.class})
    public void sendEvent(AbstractMQEvent event) throws JsonProcessingException, ExecutionException, InterruptedException {
        KafkaMessageProcessor processor = (KafkaMessageProcessor)this.messageProcessorMap.get(event.getClass());
        if (processor != null && processor.isEnableProducer()) {
            try {
                processor.sendEvent(event);
            } catch (Exception var5) {
                log.warn("send event error", var5);
                String msg = String.format("%s send event to kafka mq error, event = %s", this.context.getId(), event);
                throw var5;
            }
        }

    }

    public static void addPackagesToScan(String[] packageArray) {
        List<String> packages = new ArrayList(Arrays.asList(packageArray));
        Iterator var2 = packages.iterator();

        while(var2.hasNext()) {
            String aPackage = (String)var2.next();
            if (!packagesToScan.contains(aPackage)) {
                packagesToScan.add(aPackage);
            }
        }

    }

    private Class<? extends AbstractMQEvent>[] findSubTypes() {
        List<Class<? extends AbstractMQEvent>> types = new ArrayList<>();
        if (packagesToScan != null) {
            for (String pkg : packagesToScan) {
                ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false);
                provider.addIncludeFilter((TypeFilter)new AssignableTypeFilter(BusEvent.class));
                provider.addIncludeFilter((TypeFilter)new AssignableTypeFilter(AnonymousBusEvent.class));
                Set<BeanDefinition> components = provider.findCandidateComponents(pkg);
                for (BeanDefinition component : components) {
                    try {
                        types.add((Class<? extends AbstractMQEvent>) Class.forName(component.getBeanClassName()));
                    } catch (ClassNotFoundException e) {
                        throw new IllegalStateException("Failed to scan classpath for remote event classes", e);
                    }
                }
            }
        }
        if (log.isDebugEnabled())
            log.debug("Found sub types: " + types);
        return types.<Class<? extends AbstractMQEvent>>toArray((Class<? extends AbstractMQEvent>[])new Class[0]);
    }

    @EventListener({ContextRefreshedEvent.class})
    private void init() {
        if (!this.isStarted) {
            this.isStarted = true;
            Iterator var1 = this.mqProperties.getTopic().values().iterator();

            while(var1.hasNext()) {
                KafkaTopicProperties topicConfig = (KafkaTopicProperties)var1.next();
                this.registerTopic(topicConfig);
            }

            this.initBus();
        }
    }

    protected boolean checkPartitionedTopic(String topic) throws ExecutionException, InterruptedException {
        AtomicBoolean result = new AtomicBoolean(true);
        log.info("check topic {}", topic);
        DescribeTopicsResult describeTopicsResult = this.adminClient.describeTopics(Collections.singletonList(topic));
        KafkaFuture<TopicDescription> future = (KafkaFuture)describeTopicsResult.values().get(topic);
        int partitionNum = this.mqProperties.getPartitionNum();

        try {
            TopicDescription topicDescription = (TopicDescription)future.get();
            log.info("topic info {}", topicDescription);
        } catch (Exception var10) {
            if (var10.getCause() instanceof UnknownTopicOrPartitionException) {
                NewTopic newTopic = new NewTopic(topic, partitionNum, this.mqProperties.getReplicationFactor());
                CreateTopicsResult topics = this.adminClient.createTopics(Collections.singletonList(newTopic));
                KafkaFuture<Void> createFuture = (KafkaFuture)topics.values().get(topic);
                createFuture.whenComplete((v, t) -> {
                    if (t == null) {
                        log.info("create topic {} successful", topic);
                        result.set(true);
                    } else {
                        log.info("create topic {} failure", topic);
                        result.set(false);
                    }

                });
                createFuture.get();
            } else {
                log.warn("check topic error", var10);
            }
        }

        return result.get();
    }

    private <T extends AbstractMQEvent> KafkaMessageListenerContainer<String, String> registerConsumer(boolean isAutoCommit, boolean isConsumerGroup, Class<T> messageClass, String... topicPath) {
        return this.registerConsumer(isAutoCommit, isConsumerGroup, (String)null, messageClass, topicPath);
    }

    private <T extends AbstractMQEvent> KafkaMessageListenerContainer<String, String> registerConsumer(boolean isAutoCommit,
                                                                                                       boolean isConsumerGroup, String consumerGroupId, Class<T> messageClass, String... topicPath) {

        ContainerProperties properties = new ContainerProperties(topicPath);
        // 默认 是consumerGroup模式,
        // 并且 group.id = spring.application.name
        if (!isConsumerGroup) {
            String groupId = applicationName + "-" + java.util.UUID.randomUUID().toString();
            properties.setGroupId(groupId);
        } else if (StrUtil.isNotBlank(consumerGroupId)){
            properties.setGroupId(consumerGroupId);
        }
        // 默认 是 auto commit
        if (!isAutoCommit) {
            // 手动commit
            Properties consumerProperties = new Properties();
            consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            properties.setKafkaConsumerProperties(consumerProperties);
            properties.setAckMode(ContainerProperties.AckMode.MANUAL);

            properties.setMessageListener((AcknowledgingMessageListener<String, String>) (m, acknowledgment) -> {
                try {
                    T t = mapper.readValue(m.value(), messageClass);
                    if (!serviceMatcher.isFromSelf(t)) {
                        context.publishEvent(t);
                    }
                    acknowledgment.acknowledge();
                } catch (Exception e) {
                    if (e instanceof InvalidTypeIdException) {
                        // 说明 本地消费者没有这个类型
                        // 直接确认
                        acknowledgment.acknowledge();
                        return;
                    } else {
                        log.warn("message {} ", m.value());
                        log.warn("consumer message error", e);
                    }
                }
            });
        } else {
            // 自动commit
            properties.setMessageListener((MessageListener<String, String>) (m) -> {
                try {
                    T t = mapper.readValue(m.value(), messageClass);
                    if (!serviceMatcher.isFromSelf(t)) {
                        context.publishEvent(t);
                    }
                } catch (Exception e) {
                    if (e instanceof InvalidTypeIdException) {
                        // 说明 本地消费者没有这个类型
                        // 直接返回
                        return;
                    } else {
                        log.warn("message {} ", m.value());
                        log.warn("consumer message error", e);
                    }
                }
            });
        }

        KafkaMessageListenerContainer consumer = new KafkaMessageListenerContainer(consumerFactory, properties);
        consumer.start();

        return consumer;
    }


    private void initBus() {
        boolean anonymousBusEnable = this.mqProperties.isAnonymousBusEnable();
        boolean busEnabled = this.mqProperties.isBusEnabled();
        if (anonymousBusEnable || busEnabled) {
            Class<? extends AbstractMQEvent>[] subTypes = this.findSubTypes();
            KafkaMessageProcessor processor;
            Class[] var5;
            int var6;
            int var7;
            Class subType;
            if (anonymousBusEnable) {
                processor = this.createBusProcessor(true);
                if (processor != null) {
                    var5 = subTypes;
                    var6 = subTypes.length;

                    for(var7 = 0; var7 < var6; ++var7) {
                        subType = var5[var7];
                        if (AnonymousBusEvent.class.isAssignableFrom(subType)) {
                            this.messageProcessorMap.put(subType, processor);
                        }
                    }
                }
            }

            if (busEnabled) {
                processor = this.createBusProcessor(false);
                if (processor != null) {
                    var5 = subTypes;
                    var6 = subTypes.length;

                    for(var7 = 0; var7 < var6; ++var7) {
                        subType = var5[var7];
                        if (BusEvent.class.isAssignableFrom(subType)) {
                            this.messageProcessorMap.put(subType, processor);
                        }
                    }
                }
            }
        }

    }

    private KafkaMessageProcessor createBusProcessor(boolean anonymousModel) {
        KafkaTopicProperties topicConfig = new KafkaTopicProperties();
        String topic;
        if (anonymousModel) {
            if (StrUtil.isNotBlank(this.mqProperties.getAnonymousBusTopic())) {
                topic = this.mqProperties.getAnonymousBusTopic();
            } else {
                topic = this.env.getProperty("project", "mos") + "-" + this.env.getProperty("environment", "local") + "-anonymous-bus";
            }

            topicConfig.setMessageClass(AnonymousBusEvent.class);
        } else {
            if (StrUtil.isNotBlank(this.mqProperties.getBusTopic())) {
                topic = this.mqProperties.getBusTopic();
            } else {
                topic = this.env.getProperty("project", "mos") + "-" + this.env.getProperty("environment", "local") + "-bus";
            }

            topicConfig.setMessageClass(BusEvent.class);
        }

        topicConfig.setTopicName(topic);

        try {
            if (!this.checkPartitionedTopic(topicConfig.getTopicName())) {
                log.warn("check partitioned topic {} error", topic);
                return null;
            }
        } catch (InterruptedException | ExecutionException var5) {
            log.warn("check partitioned topic " + topic + " error", var5);
            return null;
        }

        topicConfig.setEnableConsumer(true);
        topicConfig.setEnableProducer(true);
        topicConfig.getProducer().setPartitionKeyExpression((Expression)this.spelExpressionConverter.convert("id"));
        KafkaTopicProperties.Consumer consumer = topicConfig.getConsumer();
        if (anonymousModel) {
            consumer.setConsumerGroup(false);
        } else {
            consumer.setConsumerGroup(true);
        }

        return new KafkaMessageProcessor(this, this.kafkaTemplate, this.serviceMatcher, topicConfig, this.registerConsumer(consumer.isAutoCommit(), consumer.isConsumerGroup(), topicConfig.getMessageClass(), topicConfig.getTopicName()), (ProducerTopicSelector)null);
    }

    private ProducerTopicSelector producerTopicSelector(String producerTopicSelector) {
        return StrUtil.isNotBlank(producerTopicSelector) ? (ProducerTopicSelector)this.beanFactory.getBean(producerTopicSelector, ProducerTopicSelector.class) : null;
    }

    public boolean registerTopic(KafkaTopicProperties topicConfig) {
        Class<? extends AbstractMQEvent> messageClass = topicConfig.getMessageClass();
        if (messageClass == null) {
            log.warn("mos.mq.kafka.topic.message-class can't be null");
            return false;
        } else if (!BusEvent.class.isAssignableFrom(messageClass) && !AnonymousBusEvent.class.isAssignableFrom(messageClass)) {
            try {
                if (StrUtil.isNotBlank(topicConfig.getTopicName()) && !this.checkPartitionedTopic(topicConfig.getTopicName())) {
                    return false;
                }

                if (CollectionUtil.isNotEmpty(topicConfig.getConsumer().getTopicNames())) {
                    Iterator var3 = topicConfig.getConsumer().getTopicNames().iterator();

                    while(var3.hasNext()) {
                        String topicName = (String)var3.next();
                        this.checkPartitionedTopic(topicName);
                    }
                }
            } catch (InterruptedException | ExecutionException var8) {
                log.warn("check partitioned topic error", var8);
                return false;
            }

            KafkaTopicProperties.Producer producer = topicConfig.getProducer();
            if (producer.getPartitionKeyExpression() == null) {
                producer.setPartitionKeyExpression((Expression)this.spelExpressionConverter.convert("id"));
            }

            KafkaTopicProperties.Consumer consumer = topicConfig.getConsumer();
            KafkaMessageProcessor processor;
            if (topicConfig.isEnableConsumer()) {
                List<String> topics = new ArrayList();
                if (StrUtil.isNotBlank(topicConfig.getTopicName())) {
                    topics.add(topicConfig.getTopicName());
                } else if (CollectionUtil.isNotEmpty(topicConfig.getConsumer().getTopicNames())) {
                    topics.addAll(topicConfig.getConsumer().getTopicNames());
                }

                if (topics.isEmpty()) {
                    return false;
                }

                String[] topicArray = (String[])topics.toArray(new String[0]);
                processor = new KafkaMessageProcessor(this, this.kafkaTemplate, this.serviceMatcher, topicConfig, this.registerConsumer(consumer.isAutoCommit(), consumer.isConsumerGroup(), consumer.getConsumerGroupId(), topicConfig.getMessageClass(), topicArray), this.producerTopicSelector(topicConfig.getProducer().getProducerTopicSelector()));
            } else {
                processor = new KafkaMessageProcessor(this, this.kafkaTemplate, this.serviceMatcher, topicConfig, this.producerTopicSelector(topicConfig.getProducer().getProducerTopicSelector()));
            }

            this.messageProcessorMap.put(messageClass, processor);
            return true;
        } else {
            return false;
        }
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    public ObjectMapper getObjectMapper() {
        return this.mapper;
    }

    public static long nextEventId() {
        return eventId.addAndGet(1L);
    }

    static {
        packagesToScan = CollectionUtil.newArrayList(new String[]{DEFAULT_PACKAGE});
    }
}
